NSQ 是什么
NSQ 是什么
NSQ(NSQ Messaging System) 是一种实时分布式消息传递平台,用于在分布式系统中进行可靠的实时消息传递。它由 Bitly(一家互联网公司)开发,用于处理大规模的消息流。
NSQ 的设计目标是提供简单易用、高性能和可靠的消息传递机制。它采用去中心化的架构,其中消息生产者将消息发送到NSQ集群,而消息消费者从集群中接收和处理消息。NSQ提供了低延迟、高吞吐量和水平可扩展的特性,适用于需要实时响应的应用场景。
NSQ 的关键特性包括:
- 分布式和去中心化架构:NSQ集群由多个节点组成,每个节点都是对等的,没有单点故障。
- 实时消息传递:NSQ专注于实时消息传递,能够以低延迟和高吞吐量处理消息。
- 可靠性:NSQ通过持久化消息和多副本复制机制提供可靠的消息传递,并支持消息重试和故障恢复。
- 水平可扩展:NSQ支持水平扩展,可以通过添加更多的节点来增加吞吐量和容量。
- 低配置和易用性:NSQ提供简单的命令行工具和易于理解的配置选项,使得使用和管理变得相对简单。
NSQ 是一种面向实时消息传递的分布式消息传递平台,具有高性能、可靠性和简单易用的特点,适用于需要实时处理消息的分布式系统。
NSQ 组件
NSQ 由 3 个守护进程组成:
- nsqd 是接收、队列和传送消息到客户端的守护进程。
- nsqlookupd 是管理的拓扑信息,并提供了最终一致发现服务的守护进程。
- nsqadmin 是一个 Web UI 来实时监控集群(和执行各种管理任务)。
资源消耗
NSQ
进程 | 启动时占用 |
---|---|
nsqd | 9.2MB |
nsqlookup | 8.5MB |
Kafka
column0 | column1 |
---|---|
进程 | 启动时占用 |
kafka | 299MB |
zookeeper | 58MB |
运行与维护
\ | NSQ | Kafka |
---|---|---|
依赖 | 无 | Linux基础包、bash、jdk、java |
耦合 | 无!能以nsqd单进程提供完整服务,只在多节点分布式模式下需要nsqlookup | 依赖 zookeeper |
日志 | 标准输出,自行重定向 | zookeeper 1份日志,kafka 7份日志,其中两份日志按小时自动切割 |
配置 | 10项左右,默认即是最优 | 10多个独立配置文件,数百个配置项 |
性能优化 | 默认开启 pprof。支持web可视化实时观测内存、协程等动态 | 无 |
异常排查 | 错误日志中的栈,源码量小。不依赖网络问答也能在短时间内找出问题 | 错误日志中的栈,深度的栈,巨量源码,排查需要深入了解其原理,大量阅读源码。否则只能通过互联网、查阅前人经验或大师级人脉。 |
NSQ 集群
虽然机器故障是小概率事件,但是也无法彻底避免,我们要提供高可用的服务,就必要考虑机器故障。NSQ 采用的是一个完全分布式的拓扑结构,非常适合构建起一个高可用的消息集群。
要实现高可用,有多个等级,比如部分兼容机器故障、兼容单机房故障、兼容地域主干网络故障等等。绝大部分的业务场景下,做到兼容部分机器故障是基本要求,这里我们主要总结下如何兼容部署机器故障,如果要做到更高的可用性部署拓扑也类似,就是多机房、多地域部署。
nsqd 的数量决定整个集群的吞吐能力,NSQ 具备非常好的消息处理性能,具体按实际业务量级决定部署 nsqd 节点规模,但考虑到可用性, 最少不要少于 3 个 nsqd 节点,同样也不要少于 3 个 nsdlookupd 节点。
示例部署拓扑:
启动 3 个 nsdlookupd,所有 nsqd 都连接到这三个 nsdlookupd。生成消息是直接通过负载均衡写入到 nsqd,如果那个 nsqd 挂掉,生产者就无法继续往这个 nsqd 写消息了。消费消息通过 nsdlookupd 做服务发现,配置 3 个 +nsdlookupd,其中某个挂掉不会影响消息消费,nsqd 仍然可以通过其他 nsdlookupd 被发现。nsqadmin 是个 web-ui 管理后台,无任何本地状态,单机部署多机部署都可以。
使用 Docker Compose 来搭建 NSQ 集群时,可以按照以下步骤进行操作:
- 创建一个名为
docker-compose.yml
的文件,并使用以下内容:
version: "3"
services:
nsqd1:
image: nsqio/nsq:v1.2.0
command: /nsqd --broadcast-address=nsqd1 --lookupd-tcp-address=lookupd:4160
ports:
- "4150:4150"
- "4151:4151"
networks:
- nsqnet
volumes:
- ./data/nsqd1:/data
nsqd2:
image: nsqio/nsq:v1.2.0
command: /nsqd --broadcast-address=nsqd2 --lookupd-tcp-address=lookupd:4160
ports:
- "4250:4150"
- "4251:4151"
networks:
- nsqnet
volumes:
- ./data/nsqd2:/data
lookupd:
image: nsqio/nsq:v1.2.0
command: /nsqlookupd
ports:
- "4160:4160"
- "4161:4161"
networks:
- nsqnet
networks:
nsqnet:
上述配置使用了 nsqio/nsq:v1.2.0
镜像,并创建了三个服务:nsqd1
、nsqd2
和 lookupd
。其中,nsqd1
和 nsqd2
是两个 NSQD 实例,lookupd
是 NSQ 的查找服务。
- 创建一个名为
docker-compose.override.yml
的文件,并使用以下内容:
version: "3"
services:
nsqd1:
environment:
- NSQ_LOOKUPD_TCP_ADDRESS=lookupd:4160
- NSQ_NSQD_TCP_ADDRESS=0.0.0.0:4150
- NSQ_HTTP_ADDRESS=0.0.0.0:4151
nsqd2:
environment:
- NSQ_LOOKUPD_TCP_ADDRESS=lookupd:4160
- NSQ_NSQD_TCP_ADDRESS=0.0.0.0:4150
- NSQ_HTTP_ADDRESS=0.0.0.0:4151
上述配置为每个 NSQD 实例设置了环境变量,指定了各自的 NSQD 和 HTTP 地址,并告知 NSQD 实例连接到 lookupd
服务。
- 在命令行中进入包含上述两个文件的目录,然后运行以下命令启动 NSQ 集群:
docker-compose up -d
这将启动三个容器:两个 NSQD 实例和一个 lookupd 实例。
- 现在,你可以通过以下地址访问 NSQD 实例和 lookupd 实例:
- NSQD1:
127.0.0.1:4150
(TCP) 和127.0.0.1:4151
(HTTP) - NSQD2:
127.0.0.1:4250
(TCP) 和127.0.0.1:4251
(HTTP) - lookupd:
127.0.0.1:4160
(TCP) 和127.0.0.1:4161
(HTTP)
现在,你已经成功搭建了一个简单的 NSQ 集群,其中包含两个 NSQD 实例和一个 lookupd 实例。你可以使用这个集群来发送和接收消息,并享受 NSQ 的分布式消息处理能力。记得根据实际需求进行配置和调整,例如添加更多的 NSQD 实例或使用负载均衡器来平衡流量。
然后访问的时候是使用 lookupd 去取得具体的实例地址
consumer, err := nsq.NewConsumer(topic, "my_channel", config)
consumer.ConnectToNSQLookupds([]string{"lookupd:4160"})
在这种配置下,消费者将连接到 lookupd 实例,并从它那里获取可用的 NSQD 实例列表。如果某个 NSQD 实例不可用,消费者会自动切换到其他可用的 NSQD 实例来接收消息。
使用 Docker 部署服务
# docker-compose.yml
version: '3'
services:
nsqlookupd:
image: nsqio/nsq
command: /nsqlookupd
ports:
- "4160"
- "31001:4161"
nsqd:
image: nsqio/nsq
command: /nsqd --lookupd-tcp-address=nsqlookupd:4160
depends_on:
- nsqlookupd
ports:
- "31041:4150"
- "4151"
nsqadmin:
image: nsqio/nsq
command: /nsqadmin --lookupd-http-address=nsqlookupd:4161
depends_on:
- nsqlookupd
ports:
- "31011:4171"
NSQD 默认监听在 TCP 端口 4150 上。NSQD 是 NSQ 的守护进程,它负责接收、存储和分发消息。当生产者通过 NSQ 库将消息发送到 NSQD 时,它们会连接到 NSQD 的 4150 端口。同样,消费者通过连接到 NSQD 的 4150 端口来消费消息。
需要注意的是,4150 端口只是 NSQD 默认的监听端口,可以在配置文件中自定义 NSQD 的监听端口。在上述示例中,如果在 NSQD 配置中将监听端口更改为其他值,需要相应地更新生产者和消费者代码中的端口。(例如上面的例子就映射成了 31041)
docker-compose up -d
最后检查是否成功启动了
curl http://127.0.0.1:31001/ping
范围 http://127.0.0.1:31011/
来访问 admin UI 界面
在 Golang 上消费和发送消息
如下例子
package main
import (
"fmt"
"log"
"time"
"github.com/nsqio/go-nsq"
)
// 消费者处理消息的回调函数
type MessageHandler struct{}
func (h *MessageHandler) HandleMessage(message *nsq.Message) error {
fmt.Printf("Received message: %s\n", message.Body)
return nil
}
func main() {
// 创建一个生产者
producer, err := nsq.NewProducer("localhost:31041", nsq.NewConfig())
if err != nil {
log.Fatal(err)
}
// 向指定主题发送消息
topic := "my_topic"
// 创建一个消费者
config := nsq.NewConfig()
consumer, err := nsq.NewConsumer(topic, "my_channel", config)
if err != nil {
log.Fatal(err)
}
// 设置消息处理函数
consumer.AddHandler(&MessageHandler{})
go func() {
for i := 0; i < 100; i++ {
err = producer.Publish(topic, []byte(fmt.Sprintf("Hello, NSQ!, %d", i)))
time.Sleep(10 * time.Millisecond)
if err != nil {
log.Fatal(err)
}
}
consumer.Stop()
}()
// 连接到NSQD
err = consumer.ConnectToNSQD("localhost:31041")
if err != nil {
log.Fatal(err)
}
// 运行消费者,直到接收到中断信号
<-consumer.StopChan
// 关闭生产者连接
producer.Stop()
}
输出:
2023/06/14 09:03:34 INF 1 (localhost:31041) connecting to nsqd
2023/06/14 09:03:34 INF 2 [my_topic/my_channel] (localhost:31041) connecting to nsqd
Received message: Hello, NSQ!, 1
.....
Received message: Hello, NSQ!, 98
Received message: Hello, NSQ!, 99
2023/06/14 09:10:54 INF 2 [my_topic/my_channel] stopping...
2023/06/14 09:10:54 INF 2 [my_topic/my_channel] (localhost:31041) received CLOSE_WAIT from nsqd
2023/06/14 09:10:54 INF 2 [my_topic/my_channel] (localhost:31041) beginning close
2023/06/14 09:10:54 INF 2 [my_topic/my_channel] (localhost:31041) readLoop exiting
2023/06/14 09:10:54 INF 2 [my_topic/my_channel] (localhost:31041) breaking out of writeLoop
2023/06/14 09:10:54 INF 2 [my_topic/my_channel] (localhost:31041) writeLoop exiting
2023/06/14 09:10:54 INF 2 [my_topic/my_channel] (localhost:31041) finished draining, cleanup exiting
2023/06/14 09:10:54 INF 2 [my_topic/my_channel] (localhost:31041) clean close complete
2023/06/14 09:10:54 WRN 2 [my_topic/my_channel] there are 0 connections left alive
2023/06/14 09:10:54 INF 2 [my_topic/my_channel] stopping handlers
2023/06/14 09:10:54 INF 2 [my_topic/my_channel] rdyLoop exiting
2023/06/14 09:10:54 INF 1 (localhost:31041) stopping
2023/06/14 09:10:54 INF 1 (localhost:31041) exiting router
nsq.NewConsumer 第二个入参
在 NSQ 中,消费者通过订阅主题和通道来接收消息。nsq.NewConsumer
的第二个参数是通道名称(channel name),它用于标识消费者所属的通道。
在 NSQ 中,每个主题可以有多个通道,每个通道都是独立的消息消费者组。通过使用不同的通道名称,可以实现消息的负载均衡和并行处理。每个通道中的消费者将独立地接收和处理主题中的消息,不同通道的消费者之间不会互相干扰。
当多个消费者同时订阅同一个主题时,如果它们使用相同的通道名称,那么它们将共享接收到的消息。每条消息只会被同一个通道中的一个消费者处理。这种方式适用于实现消息的负载均衡和水平扩展。
如果你希望每个消费者都独立地接收主题中的所有消息,可以为每个消费者使用不同的通道名称。这样,每个通道中的消费者将独立处理消息,实现并行处理的能力。
示例代码中的 nsq.NewConsumer
的第二个参数就是通道名称,你可以根据需要选择合适的通道名称来实现你的消费者逻辑。
consumer, err := nsq.NewConsumer("my_topic", "my_channel", config)
在上述代码中,消费者使用 my_channel
作为通道名称,用于接收 my_topic
主题中的消息。你可以根据需要将通道名称更改为你自己的名称,确保消费者逻辑的正确运行。